// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#include <assert.h>
#include <stdlib.h>

#include <algorithm>
#include <atomic>
#include <iostream>

#include "pure_mem/encoded_version_node.h"
#include "pure_mem/epoche.h"
#include "pure_mem/key_index/abstract_uk_index.h"
#include "pure_mem/key_index/userkey_index.h"
#include "pure_mem/mvcc_key.h"
#include "pure_mem/uncoded_version_node.h"
#include "rocksdb/slice.h"
#include "util/allocator.h"
#include "util/coding.h"
#include "util/random.h"

namespace rocksdb
{

  /**
   * memtable中索引接口封装，与 inlineskiplist 等具有相同的接口
   * InlineUserKeyIndex　在初始化时，增加　UserKeyIndexType传入，用于要求使用的索引算法
   *
   */
  template <class Comparator>
  class InlineUserKeyIndex
  {
  public:
    using DecodedKey =
        typename std::remove_reference<Comparator>::type::DecodedType;

  std::mutex mtx_;
  std::unordered_map<std::string, ITreeVoidRef *> hashMap_;

    // Create a new InlineUserKeyIndex object that will use "cmp" for comparing
    // keys, and will allocate memory using "*allocator".  Objects allocated
    // in the allocator must remain allocated for the lifetime of the
    // skiplist object.
    explicit InlineUserKeyIndex(const Comparator cmp, UserKeyIndexType type,
        ITreeVoidRef::RefType refType = ITreeVoidRef::RefType::REFTYPE1, Logger *logger = nullptr);

    ~InlineUserKeyIndex()
    {
      delete userKeyIndex_;
    }

    // Allocates a key and a skip-list node, returning a pointer to the key
    // portion of the node.  This method is thread-safe if the allocator
    // is thread-safe.
    char *AllocateKey(size_t key_size);

    // Inserts a key allocated by AllocateKey, after the actual key value
    // has been filled in.
    //
    // REQUIRES: nothing that compares equal to key is currently in the list.
    // REQUIRES: no concurrent calls to any of inserts.
    bool Insert(const char *key);

    // Inserts a key allocated by AllocateKey with a hint of last insert
    // position in the skip-list. If hint points to nullptr, a new hint will be
    // populated, which can be used in subsequent calls.
    //
    // It can be used to optimize the workload where there are multiple groups
    // of keys, and each key is likely to insert to a location close to the last
    // inserted key in the same group. One example is sequential inserts.
    //
    // REQUIRES: nothing that compares equal to key is currently in the list.
    // REQUIRES: no concurrent calls to any of inserts.
    bool InsertWithHint(const char *key, void **hint);

    // Like Insert, but external synchronization is not required.
    bool InsertConcurrently(const char *key);

    // Returns true iff an entry that compares equal to key is in the list.
    bool Contains(const char *key) const;

    // Return estimated number of entries smaller than `key`.
    uint64_t EstimateCount(const char *key) const { return 0; };

    // Iteration over the contents of a skip list
    class Iterator
    {
    public:
      // Initialize an iterator over the specified list.
      // The returned iterator is not valid.
      explicit Iterator(const InlineUserKeyIndex *list);

      // Change the underlying skiplist used for this iterator
      // This enables us not changing the iterator without deallocating
      // an old one and then allocating a new one
      void SetList(const InlineUserKeyIndex *list);

      // Returns true iff the iterator is positioned at a valid node.
      bool Valid() const;

      // Returns the key at the current position.
      // REQUIRES: Valid()
      const char *key() const;

      VersionNode *node() const;

      ITreeVoidRef *userKeyRef()
      {
        assert(Valid());
        return artRef_;
      }

      // Advances to the next position.
      // REQUIRES: Valid()
      void Next();

      // Advances to the previous position.
      // REQUIRES: Valid()
      void Prev();

      // Advance to the first entry with a key >= target
      void Seek(const char *target);

      void Seek(const Slice &key);

      void Seek(const VersionNode&);

      void HashSeek(const char *target);
      void HashSeek(const Slice &key);

      void *PushCurNode();

      // Retreat to the last entry with a key <= target
      void SeekForPrev(const char *target);
      void HashSeekForPrev(const char *target);
      void HashSeekForPrev(const Slice &target);
      void HashSeekForPrevOrEqual(const Slice &target);

      // Position at the first entry in list.
      // Final state of iterator is Valid() iff list is not empty.
      void SeekToFirst();

      // Position at the last entry in list.
      // Final state of iterator is Valid() iff list is not empty.
      void SeekToLast();

    private:
      const InlineUserKeyIndex *list_;
      VersionNode *node_;
      ITreeVoidRef *artRef_;
    };

    InlineUserKeyIndex::Iterator *GetIterator()
    {
      return new InlineUserKeyIndex::Iterator(this);
    }

  private:
    // Allocator* const allocator_;  // Allocator used for allocations of nodes
    // Immutable after construction
    Comparator const compare_;

    // VersionNode* const head_;
    IUserKeyIndex *userKeyIndex_;

    Logger *logger_;

    VersionNode *AllocateNode(size_t key_size, int height);

    bool Equal(const char *a, const char *b) const
    {
      return (compare_(a, b) == 0);
    }

    bool Equal(const char *a, const Slice &b) const
    {
      return (compare_(a, b) == 0);
    }

    bool Equal(const char *a, const VersionNode *n) const
    {
      return (compare(n, a) == 0);
    }

    bool Equal(const Slice &a, const VersionNode *n) const
    {
      return (compare(n, a) == 0);
    }

    bool LessThan(const char *a, const char *b) const
    {
      return (compare_(a, b) < 0);
    }

    bool LessThan(const char *a, const Slice &b) const
    {
      return (compare_(a, b) < 0);
    }

    bool LessThan(const char *a, const VersionNode *n) const
    {
      return (compare(n, a) > 0);
    }

    bool LessThan(const Slice &s, const VersionNode *n) const
    {
      return (compare(n, s) > 0);
    }

    bool LessThan(const VersionNode *n, const char *a) const
    {
      return (compare(n, a) < 0);
    }

    bool LessThan(const VersionNode *n, const Slice &s) const
    {
      return (compare(n, s) < 0);
    }

    // Return true if key is greater than the data stored in "n".  Null n
    // is considered infinite.  n should not be head_.
    bool KeyIsAfterNode(const char *key, VersionNode *n) const;
    bool KeyIsAfterNode(const DecodedKey &key, VersionNode *n) const;

    // Returns the earliest node with a key >= key.
    // Return nullptr if there is no such node.
    VersionNode *FindGreaterOrEqual(const char *key) const;
    VersionNode *FindGreaterOrEqualRef(const char *key,
                                       ITreeVoidRef *&curRef) const;
    VersionNode *HashFindGreaterOrEqualRef(const char *key,
                                           ITreeVoidRef *&curRef) const;

    VersionNode *FindGreaterOrEqualRef(const Slice &key,
                                       ITreeVoidRef *&curRef) const;
    VersionNode *HashFindGreaterOrEqualRef(const Slice &key,
                                           ITreeVoidRef *&curRef) const;

    VersionNode *FindGreaterOrEqualRef(const VersionNode &vn,
                                       ITreeVoidRef *&curRef) const;

    // Return the latest node with a key < key.
    // Return head_ if there is no such node.
    // Fills prev[level] with pointer to previous node at "level" for every
    // level in [0..max_height_-1], if prev is non-null.
    VersionNode *FindLessThan(const char *key,
                              VersionNode **prev = nullptr) const;

    // Return the last node in the list.
    // Return head_ if list is empty.
    VersionNode *FindLast() const;
    VersionNode *FindLastRef(ITreeVoidRef *&curRef) const;

    // No copying allowed
    InlineUserKeyIndex(const InlineUserKeyIndex &);
    InlineUserKeyIndex &operator=(const InlineUserKeyIndex &);

    int compare(const VersionNode*, const VersionNode*) const;
    int compare(const VersionNode*, const char*) const;
    int compare(const VersionNode*, const Slice&) const;
  };

  template <class Comparator>
  int InlineUserKeyIndex<Comparator>::compare(const VersionNode* node1,
                                              const VersionNode* node2) const   {
    assert(node1 != nullptr && node2 != nullptr);

    int compp = 0;
    auto nodeType = node1->GetNodeType();
    switch (nodeType) {
      case VersionNode::NodeType::ENCODED:{
        auto *evn1 = static_cast<const EncodedVersionNode*>(node1);
        auto *evn2 = static_cast<const EncodedVersionNode*>(node2);
        compp = compare_(evn1->Key(), evn2->Key());
        break;
      }
      case VersionNode::NodeType::UNCODED:{
        auto *uvn1 = static_cast<const UncodedVersionNode*>(node1);
        auto *uvn2 = static_cast<const UncodedVersionNode*>(node2);
        compp = compare_(uvn1->GetKey(), uvn2->GetKey(),
                         uvn1->GetSequenceNumber(),
                         uvn2->GetSequenceNumber());
        break;
      }
    }

    return compp;
  }

  template <class Comparator>
  int InlineUserKeyIndex<Comparator>::compare(const VersionNode* node,
                                              const char* key) const {
    assert(node != nullptr && key != nullptr);

    int compp = 0;
    switch (node->GetNodeType()) {
      case VersionNode::NodeType::ENCODED:
        compp = compare_(static_cast<const EncodedVersionNode*>(node)->Key(), key);
        break;
      case VersionNode::NodeType::UNCODED:
      {
        auto *uvn = static_cast<const UncodedVersionNode*>(node);
        compp = compare_(uvn->GetKey(), uvn->GetSequenceNumber(), key);
        break;
      }
    }

    return compp;
  }

  template <class Comparator>
  int InlineUserKeyIndex<Comparator>::compare(const VersionNode* node,
                                              const Slice& key) const {
    assert(node != nullptr);

    int compp = 0;
    switch (node->GetNodeType()) {
      case VersionNode::NodeType::ENCODED:
        compp = compare_(static_cast<const EncodedVersionNode*>(node)->Key(), key);
        break;
      case VersionNode::NodeType::UNCODED:
      {
        auto *uvn = static_cast<const UncodedVersionNode*>(node);
        compp = compare_(uvn->GetKey(), uvn->GetSequenceNumber(), key);
        break;
      }
    }

    return compp;
  }

  template <class Comparator>
  inline InlineUserKeyIndex<Comparator>::Iterator::Iterator(
      const InlineUserKeyIndex *list)
  {
    SetList(list);
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::SetList(
      const InlineUserKeyIndex *list)
  {
    list_ = list;
    node_ = nullptr;
    artRef_ = nullptr;
  }

  template <class Comparator>
  inline bool InlineUserKeyIndex<Comparator>::Iterator::Valid() const
  {
    return node_ != nullptr && artRef_ != nullptr;
  }

  template <class Comparator>
  inline const char *InlineUserKeyIndex<Comparator>::Iterator::key() const
  {
    assert(Valid());
    return node_->Key();
  }

  template <class Comparator>
  inline VersionNode *InlineUserKeyIndex<Comparator>::Iterator::node() const {
    assert(Valid());
    return node_;
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::Next()
  {
    assert(Valid());
    node_ = node_->Next();
    while (node_ == nullptr)
    {
      artRef_ = artRef_->Next();
      if (artRef_ == nullptr)
        return;
      node_ = artRef_->getContentList();
    }
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::Prev()
  {
    assert(Valid());
    VersionNode *curNode = node_->Prev();
    node_ = curNode;
    while (node_ == nullptr)
    {
      artRef_ = artRef_->Prev();
      if (artRef_ == nullptr)
        return;
      curNode = artRef_->getContentList();
      while (curNode != nullptr)
      {
        node_ = curNode;
        curNode = curNode->Next();
      }
    }
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::Seek(const char *target)
  {
    node_ = list_->FindGreaterOrEqualRef(target, artRef_);
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::Seek(const Slice &key)
  {
    node_ = list_->FindGreaterOrEqualRef(key, artRef_);
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::Seek(const VersionNode &vn) {
    node_ = list_->FindGreaterOrEqualRef(vn, artRef_);
  }


  template <class Comparator>
  void *InlineUserKeyIndex<Comparator>::Iterator::PushCurNode()
  {
    assert(artRef_ != nullptr);
    VersionNode *beDeleteNode = node_, *prev, *next;
    while (true)
    {
      bool findCurNode = false;
      prev = artRef_->getContentList();
      if ((void *)prev == (void *)beDeleteNode)
      {
        if (!artRef_->CASContentList(beDeleteNode, beDeleteNode->Next()))
        {
          continue;
        }
        break;
      }

      while (prev != nullptr)
      {
        next = prev->Next();
        if ((void *)next == (void *)beDeleteNode)
        {
          findCurNode = true;
          break;
        }
        prev = next;
      }
      if (!findCurNode)
      {
        ROCKS_LOG_FATAL(list_->logger_, "cannot find current node in content list.");
        return nullptr;
      }
      next = beDeleteNode->Next();
      if (!prev->CASNext(beDeleteNode, next))
      {
        continue;
      }
      if (next != nullptr)
        next->CASPrev(beDeleteNode, prev);
      break;
    }

    this->Next();
    return beDeleteNode;
  }

  template <class Comparator>
  inline void
  InlineUserKeyIndex<Comparator>::Iterator::SeekForPrev(const char *target)
  {
    Seek(target);
    if (!Valid())
    {
      SeekToLast();
    }
    while (Valid() && list_->LessThan(target, node()))
    {
      Prev();
    }
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::SeekToFirst()
  {
    artRef_ = this->list_->userKeyIndex_->head();
    if (artRef_ == nullptr)
    {
      node_ = nullptr;
      return;
    }
    node_ = artRef_->getContentList();
    while (node_ == nullptr)
    {
      artRef_ = artRef_->Next();
      if (artRef_ == nullptr)
        return;
      node_ = artRef_->getContentList();
    }
  }

  template <class Comparator>
  inline void InlineUserKeyIndex<Comparator>::Iterator::SeekToLast()
  {
    node_ = list_->FindLastRef(artRef_);
  }

  template <class Comparator>
  void InlineUserKeyIndex<Comparator>::Iterator::HashSeek(const char *target)
  {
    //std::unique_lock<std::mutex> lk(const_cast<InlineART*>(list_)->mtx_);
    node_ = list_->HashFindGreaterOrEqualRef(target, artRef_);
  }

  template <class Comparator>
  void InlineUserKeyIndex<Comparator>::Iterator::HashSeek(const Slice &key)
  {
    //std::unique_lock<std::mutex> lk(const_cast<InlineART*>(list_)->mtx_);
    node_ = list_->HashFindGreaterOrEqualRef(key, artRef_);
  }

  template <class Comparator>
  void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrev(const char *target)
  {
    HashSeek(target);
    if (!Valid())
    {
      SeekToLast();
    }
    while (Valid() && list_->LessThan(target, node()))
    {
      Prev();
    }
  }

  template <class Comparator>
  void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrev(const Slice &target)
  {
    HashSeek(target);
    if (!Valid())
    {
      SeekToLast();
    }
    // TODO
    // Why is LessThan here different from the overloaded version above???
    while (Valid() && !list_->LessThan(key(), target))
    {
      Prev();
    }
  }

  template <class Comparator>
  void InlineUserKeyIndex<Comparator>::Iterator::HashSeekForPrevOrEqual(const Slice &target)
  {
    HashSeek(target);
    if (!Valid())
    {
      SeekToLast();
    }
    while (Valid() && !(list_->Equal(target, node()) || list_->LessThan(node(), target)))
    {
      Prev();
    }
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::KeyIsAfterNode(const char *key,
                                                      VersionNode *n) const
  {
    // nullptr n is considered infinite
    assert(n != nullptr);
    return (n != nullptr) && (compare(n, key) < 0);
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::KeyIsAfterNode(const DecodedKey &key,
                                                      VersionNode *n) const
  {
    // nullptr n is considered infinite
    assert(n != nullptr);
    return (n != nullptr) && (compare(n, key) < 0);
  }

  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::FindGreaterOrEqualRef(
      const char *key, ITreeVoidRef *&curRef) const
  {
    Slice buf = GetLengthPrefixedSlice(key);
    ROCKS_LOG_DEBUG(logger_, "input buff: %s", buf.ToString(true).c_str());
    MvccKey mk;
    mk.parseKey(buf);
    ROCKS_LOG_DEBUG(logger_, "parse buff to key: ");
    mk.dump(logger_);

    curRef = userKeyIndex_->getGETreeVoidRef(
        mk.userKey_); // 找到满足userkey比对关系的位置
    if (curRef == nullptr)
    {
      ROCKS_LOG_INFO(logger_, "cannot find artRef for key:%s.", mk.userKey_.ToString(true).c_str());
      return nullptr;
    }

    VersionNode *retNode = curRef->getContentList();

    while (retNode == nullptr ||
           compare(retNode, key) < 0)
    { // 比对hlc和seqnum部分的大小。
      ROCKS_LOG_WARN(logger_, "compare_(retNode->Key(), key) failed ... retcode: %s",
                     retNode == nullptr ? "is null." : "is not null");

      if (retNode == nullptr)
      {
        curRef = curRef->Next();
        // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
        if (curRef == nullptr)
          return nullptr;
        retNode = curRef->getContentList();
      }
      else
      {
        retNode = retNode->Next();
      }
    }

    return retNode;
  }

  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::HashFindGreaterOrEqualRef(
      const char *key, ITreeVoidRef *&curRef) const
  {
    Slice buf = GetLengthPrefixedSlice(key);
    ROCKS_LOG_DEBUG(logger_, "input buff: %s", buf.ToString(true).c_str());
    MvccKey mk;
    mk.parseKey(buf);
    ROCKS_LOG_DEBUG(logger_, "parse buff to key: ");
    mk.dump(logger_);

    auto uMapIter = hashMap_.find(mk.userKey_.ToString());
    if (uMapIter != hashMap_.end())
    {
      curRef = uMapIter->second;
    }
    else
    {
      curRef = userKeyIndex_->getGETreeVoidRef(mk.userKey_); // 找到满足userkey比对关系的位置
    }
    if (curRef == nullptr)
    {
      ROCKS_LOG_INFO(logger_, "cannot find artRef for key:%s", mk.userKey_.ToString(true).c_str());
      return nullptr;
    }

    VersionNode *retNode = curRef->getContentList();
    while (retNode == nullptr ||
           compare(retNode, key) < 0)
    { // 比对hlc和seqnum部分的大小。
      ROCKS_LOG_WARN(logger_, "compare_(retNode->Key(), key) failed ... retcode: %s",
          retNode == nullptr ? "is null" : "is not null");

      if (retNode == nullptr)
      {
        curRef = curRef->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
        if (curRef == nullptr)
          return nullptr;
        retNode = curRef->getContentList();
      }
      else
      {
        retNode = retNode->Next();
      }
    }

    return retNode;
  }

  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::FindGreaterOrEqualRef(
      const Slice &key, ITreeVoidRef *&curRef) const
  {
    ROCKS_LOG_DEBUG(logger_, "input buff: %s", key.ToString(true).c_str());
    MvccKey mk;
    mk.parseKey(key);
    mk.dump(logger_);

    curRef = userKeyIndex_->getGETreeVoidRef(
        mk.userKey_); // 找到满足userkey比对关系的位置
    if (curRef == nullptr)
    {
      ROCKS_LOG_INFO(logger_, "cannot find artRef for key:%s", mk.userKey_.ToString(true).c_str());
      return nullptr;
    }

    VersionNode *retNode = curRef->getContentList();

    while (retNode == nullptr ||
           compare(retNode, key) < 0)
    { // 比对hlc和seqnum部分的大小。
      ROCKS_LOG_WARN(logger_, "compare_(retNode->Key(), key) failed ... retcode: %s",
          retNode == nullptr ? "is null" : " not null");
      if (retNode == nullptr)
      {
        curRef = curRef->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
        if (curRef == nullptr)
          return nullptr;
        retNode = curRef->getContentList();
      }
      else
      {
        retNode = retNode->Next();
      }
    }

    return retNode;
  }

  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::FindGreaterOrEqualRef(
      const VersionNode &vn, ITreeVoidRef *&curRef) const {
    auto &versionNode = static_cast<const UncodedVersionNode&>(vn);
    const MvccKey &mk = versionNode.GetMvccKey();

    curRef = userKeyIndex_->getGETreeVoidRef(
        mk.userKey_); // 找到满足userkey比对关系的位置
    if (curRef == nullptr) {
      return nullptr;
    }

    VersionNode *retNode = curRef->getContentList();
    while (retNode == nullptr ||
           compare(retNode, &versionNode) < 0) { // 比对hlc和seqnum部分的大小。
      if (retNode == nullptr) {
        curRef =
            curRef
                ->Next(); // hlc和seqnum都不匹配时，获取下一个 userkey 继续比对。
        if (curRef == nullptr)
          return nullptr;
        retNode = curRef->getContentList();
      } else {
        retNode = retNode->Next();
      }
    }

    return retNode;
  }

  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::HashFindGreaterOrEqualRef(const Slice &key, ITreeVoidRef *&curRef) const
  {
    ROCKS_LOG_DEBUG(logger_, "input buff: %s", key.ToString(true).c_str());
    MvccKey mk;
    mk.parseKey(key);
    mk.dump(logger_);

    //先在Hash表里面找，如果没有找到，则在ART里面找。
    auto uMapIter = hashMap_.find(mk.userKey_.ToString()); ////Slice类型  ToString(true)---16进制
    if (uMapIter != hashMap_.end())
    {
      curRef = uMapIter->second;
    }
    else
    {
      curRef = userKeyIndex_->getGETreeVoidRef(mk.userKey_); // 找到满足userkey比对关系的位置
    }
    if (curRef == nullptr)
    {
      ROCKS_LOG_INFO(logger_, "cannot find artRef for key:%s", mk.userKey_.ToString(true).c_str());
      return nullptr;
    }

    VersionNode *retNode = curRef->getContentList();
    while (retNode == nullptr || compare(retNode, key) < 0)
    { // 比对hlc和seqnum部分的大小。

      ROCKS_LOG_WARN(logger_, "compare_(retNode->Key(), key) failed ... retcode: %s",
          retNode == nullptr ? "is null" : "not null");
      if (retNode == nullptr)
      {
        curRef = curRef->Next(); // hlc和seqnum都
        // 不匹配时，获取下一个 userkey 继续比对。
        if (curRef == nullptr)
          return nullptr;
        retNode = curRef->getContentList();
      }
      else
      {
        retNode = retNode->Next();
      }
    }

    return retNode;
  }

  template <class Comparator>
  VersionNode *
  InlineUserKeyIndex<Comparator>::FindGreaterOrEqual(const char *key) const
  {
    ITreeVoidRef *curRef = nullptr;
    return FindGreaterOrEqualRef(key, curRef);
  }

  template <class Comparator>
  VersionNode *
  InlineUserKeyIndex<Comparator>::FindLessThan(const char *key,
                                               VersionNode **prev) const
  {
    Slice buf = GetLengthPrefixedSlice(key);
    MvccKey mk;
    mk.parseKey(buf);

    ITreeVoidRef *curRef = this->userKeyIndex_->getGETreeVoidRef(mk.userKey_);
    VersionNode *preNode = curRef->getContentList();
    VersionNode *nextNode = nullptr;
    while (true)
    {
      if (preNode == nullptr || compare(preNode, key) >= 0)
      {
        curRef = curRef->Prev();
        if (curRef == nullptr)
          return nullptr;
        preNode = curRef->getContentList();
        continue;
      }
      nextNode = preNode->Next();
      if (nextNode == nullptr || compare(nextNode, key) >= 0)
      {
        return preNode;
      }
      preNode = nextNode;
      nextNode = nextNode->Next();
    }
  }

  template <class Comparator>
  VersionNode *
  InlineUserKeyIndex<Comparator>::FindLastRef(ITreeVoidRef *&curRef) const
  {
    curRef = this->userKeyIndex_->tail();
    if (curRef == nullptr)
    {
      return nullptr;
    }
    VersionNode *prevNode = nullptr;
    VersionNode *curNode = curRef->getContentList();
    while (true)
    {
      if (curNode == nullptr)
      {
        if (prevNode != nullptr)
          return prevNode;
        curRef = curRef->Prev();
        if (curRef == nullptr)
          return nullptr;
        curNode = curRef->getContentList();
        continue;
      }
      prevNode = curNode;
      curNode = curNode->Next();
    }
  }
  template <class Comparator>
  VersionNode *InlineUserKeyIndex<Comparator>::FindLast() const
  {
    ITreeVoidRef *curRef = nullptr;
    return FindLastRef(curRef);
  }

  template <class Comparator>
  InlineUserKeyIndex<Comparator>::InlineUserKeyIndex(const Comparator cmp,
                                                     UserKeyIndexType type, ITreeVoidRef::RefType refType, Logger *loger)
      : compare_(cmp), logger_(loger)
  {
    userKeyIndex_ = new UserKeyIndex(type, refType);
  }

  template <class Comparator>
  char *InlineUserKeyIndex<Comparator>::AllocateKey(size_t key_size)
  {
    assert(false); // should not run here.
    return nullptr;
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::InsertConcurrently(const char *key)
  {
    return this->Insert(key);
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::InsertWithHint(const char *key,
                                                      void **hint)
  {
    return this->Insert(key);
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::Insert(const char *key)
  {
    auto x =
        const_cast<VersionNode *>(reinterpret_cast<const VersionNode *>(key));
    x->SetNext(nullptr);
    x->SetPrev(nullptr);

    MvccKey mk;
    auto nodeType = x->GetNodeType();
    switch (nodeType) {
      case VersionNode::NodeType::ENCODED:{
        assert(static_cast<EncodedVersionNode*>(x)->Key());
        Slice buf = GetLengthPrefixedSlice(static_cast<EncodedVersionNode*>(x)->Key());
        assert(buf.size() > 0);
        mk.parseKey(buf);
        break;
      }
      case VersionNode::NodeType::UNCODED:{
        mk = static_cast<UncodedVersionNode*>(x)->GetMvccKey();
        break;
      }
    }

    ITreeVoidRef *curRef = this->userKeyIndex_->getTreeVoidRef(mk.userKey_, x);
    assert(curRef != nullptr);

    // Add hash index for userKey
    std::unique_lock<std::mutex> lk(mtx_);
    this->hashMap_.insert(std::pair<std::string, ITreeVoidRef *>{mk.userKey_.ToString(), curRef});
    lk.unlock();

    while (true)
    { // CAS 无锁并发机制， 冲突重试机制
      VersionNode *preNode = curRef->getContentList();
      if (preNode == nullptr)
      { // 该 userkey 尚未有value时
        if (!curRef->CASContentList(nullptr, x))
          continue;
        else
        {
          return true;
        }
      }
      if ((void *)(preNode) ==
          (void *)(x))
      { // 新建的artref，已经将node放入其中。
        return true;
      }
      int compp = compare(preNode, x);
      if (compp == 0)
      {
        // duplicate key
        return false;
      }
      if (compp >= 0)
      { // 链表按照 从小到大，当前key小，则排在最前面。
        x->SetNext(preNode);
        if (!curRef->CASContentList(preNode, x))
          continue;
        else
        {
          preNode->SetPrev(x);
          return true;
        }
      }

      VersionNode *nextNode = preNode->Next();
      while (nextNode != nullptr)
      { // 寻找 当前key对应的前后节点
        if ((void *)(nextNode) ==
            (void *)(x))
        { // 可能存在 x插入后，其他线程将 node也插入到artref中。
          return true;
        }
        int cmp = compare(nextNode, x);
        if (cmp < 0)
        {
          preNode = nextNode;
          nextNode = nextNode->Next();
        }
        else if (cmp == 0)
        {
          // duplicate key
          return false;
        }
        else
        {
          break;
        }
      }

      x->SetNext(nextNode);
      x->SetPrev(preNode);

      if (!preNode->CASNext(nextNode, x))
      {
        continue;
      }
      if (nextNode != nullptr)
        nextNode->CASPrev(preNode, x);
      return true;
    }
    return false;
  }

  template <class Comparator>
  bool InlineUserKeyIndex<Comparator>::Contains(const char *key) const
  {
    VersionNode *x = FindGreaterOrEqual(key);
    if (x != nullptr && Equal(key, x))
    {
      return true;
    }
    else
    {
      return false;
    }
  }

} // namespace rocksdb
